1   package org.apache.solr.common.cloud;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements.  See the NOTICE file distributed with
6    * this work for additional information regarding copyright ownership.
7    * The ASF licenses this file to You under the Apache License, Version 2.0
8    * (the "License"); you may not use this file except in compliance with
9    * the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required byOCP applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  import java.io.Closeable;
21  import java.io.Serializable;
22  import java.io.UnsupportedEncodingException;
23  import java.lang.invoke.MethodHandles;
24  import java.net.URLDecoder;
25  import java.util.ArrayList;
26  import java.util.Collection;
27  import java.util.Collections;
28  import java.util.HashSet;
29  import java.util.LinkedHashMap;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Map.Entry;
33  import java.util.Set;
34  import java.util.concurrent.ConcurrentHashMap;
35  import java.util.concurrent.ThreadFactory;
36  import java.util.concurrent.TimeUnit;
37  
38  import org.apache.solr.common.Callable;
39  import org.apache.solr.common.SolrException;
40  import org.apache.solr.common.SolrException.ErrorCode;
41  import org.apache.solr.common.util.Pair;
42  import org.apache.solr.common.util.Utils;
43  import org.apache.zookeeper.CreateMode;
44  import org.apache.zookeeper.KeeperException;
45  import org.apache.zookeeper.WatchedEvent;
46  import org.apache.zookeeper.Watcher;
47  import org.apache.zookeeper.Watcher.Event.EventType;
48  import org.apache.zookeeper.data.Stat;
49  import org.slf4j.Logger;
50  import org.slf4j.LoggerFactory;
51  
52  import static java.util.Arrays.asList;
53  import static java.util.Collections.EMPTY_MAP;
54  import static java.util.Collections.emptyMap;
55  import static java.util.Collections.emptySet;
56  import static java.util.Collections.unmodifiableSet;
57  import static org.apache.solr.common.util.Utils.fromJSON;
58  
59  public class ZkStateReader implements Closeable {
60    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
61    
62    public static final String BASE_URL_PROP = "base_url";
63    public static final String NODE_NAME_PROP = "node_name";
64    public static final String CORE_NODE_NAME_PROP = "core_node_name";
65    public static final String ROLES_PROP = "roles";
66    public static final String STATE_PROP = "state";
67    public static final String CORE_NAME_PROP = "core";
68    public static final String COLLECTION_PROP = "collection";
69    public static final String ELECTION_NODE_PROP = "election_node";
70    public static final String SHARD_ID_PROP = "shard";
71    public static final String REPLICA_PROP = "replica";
72    public static final String SHARD_RANGE_PROP = "shard_range";
73    public static final String SHARD_STATE_PROP = "shard_state";
74    public static final String SHARD_PARENT_PROP = "shard_parent";
75    public static final String NUM_SHARDS_PROP = "numShards";
76    public static final String LEADER_PROP = "leader";
77    public static final String PROPERTY_PROP = "property";
78    public static final String PROPERTY_VALUE_PROP = "property.value";
79    public static final String MAX_AT_ONCE_PROP = "maxAtOnce";
80    public static final String MAX_WAIT_SECONDS_PROP = "maxWaitSeconds"; 
81    public static final String COLLECTIONS_ZKNODE = "/collections";
82    public static final String LIVE_NODES_ZKNODE = "/live_nodes";
83    public static final String ALIASES = "/aliases.json";
84    public static final String CLUSTER_STATE = "/clusterstate.json";
85    public static final String CLUSTER_PROPS = "/clusterprops.json";
86    public static final String REJOIN_AT_HEAD_PROP = "rejoinAtHead";
87    public static final String SOLR_SECURITY_CONF_PATH = "/security.json";
88  
89    public static final String REPLICATION_FACTOR = "replicationFactor";
90    public static final String MAX_SHARDS_PER_NODE = "maxShardsPerNode";
91    public static final String AUTO_ADD_REPLICAS = "autoAddReplicas";
92  
93    public static final String ROLES = "/roles.json";
94  
95    public static final String CONFIGS_ZKNODE = "/configs";
96    public final static String CONFIGNAME_PROP="configName";
97  
98    public static final String LEGACY_CLOUD = "legacyCloud";
99  
100   public static final String URL_SCHEME = "urlScheme";
101 
102   /** A view of the current state of all collections; combines all the different state sources into a single view. */
103   protected volatile ClusterState clusterState;
104 
105   private static final int GET_LEADER_RETRY_INTERVAL_MS = 50;
106   private static final int GET_LEADER_RETRY_DEFAULT_TIMEOUT = 4000;
107 
108   public static final String LEADER_ELECT_ZKNODE = "leader_elect";
109 
110   public static final String SHARD_LEADERS_ZKNODE = "leaders";
111   public static final String ELECTION_NODE = "election";
112 
113   /** Collections we actively care about, and will try to keep watch on. */
114   private final Set<String> interestingCollections = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
115 
116   /** Collections tracked in the legacy (shared) state format, reflects the contents of clusterstate.json. */
117   private Map<String, ClusterState.CollectionRef> legacyCollectionStates = emptyMap();
118   /** Last seen ZK version of clusterstate.json. */
119   private int legacyClusterStateVersion = 0;
120 
121   /** Collections with format2 state.json, "interesting" and actively watched. */
122   private final ConcurrentHashMap<String, DocCollection> watchedCollectionStates = new ConcurrentHashMap<String, DocCollection>();
123 
124   /** Collections with format2 state.json, not "interesting" and not actively watched. */
125   private final ConcurrentHashMap<String, LazyCollectionRef> lazyCollectionStates = new ConcurrentHashMap<String, LazyCollectionRef>();
126 
127   private volatile Set<String> liveNodes = emptySet();
128 
129   private final ZkConfigManager configManager;
130 
131   private ConfigData securityData;
132 
133   private final Runnable securityNodeListener;
134 
135   public static final Set<String> KNOWN_CLUSTER_PROPS = unmodifiableSet(new HashSet<>(asList(
136       LEGACY_CLOUD,
137       URL_SCHEME,
138       AUTO_ADD_REPLICAS)));
139 
140   /**
141    * Returns config set name for collection.
142    *
143    * @param collection to return config set name for
144    */
145   public String readConfigName(String collection) {
146 
147     String configName = null;
148 
149     String path = COLLECTIONS_ZKNODE + "/" + collection;
150     if (log.isInfoEnabled()) {
151       log.info("Load collection config from:" + path);
152     }
153 
154     try {
155       byte[] data = zkClient.getData(path, null, null, true);
156 
157       if(data != null) {
158         ZkNodeProps props = ZkNodeProps.load(data);
159         configName = props.getStr(CONFIGNAME_PROP);
160       }
161 
162       if (configName != null) {
163         if (!zkClient.exists(CONFIGS_ZKNODE + "/" + configName, true)) {
164           log.error("Specified config does not exist in ZooKeeper:" + configName);
165           throw new ZooKeeperException(ErrorCode.SERVER_ERROR,
166               "Specified config does not exist in ZooKeeper:" + configName);
167         } else if (log.isInfoEnabled()) {
168           log.info("path={} {}={} specified config exists in ZooKeeper",
169               new Object[] {path, CONFIGNAME_PROP, configName});
170         }
171       } else {
172         throw new ZooKeeperException(ErrorCode.INVALID_STATE, "No config data found at path: " + path);
173       }
174     }
175     catch (KeeperException e) {
176       throw new SolrException(ErrorCode.SERVER_ERROR, "Error loading config name for collection " + collection, e);
177     }
178     catch (InterruptedException e) {
179       Thread.interrupted();
180       throw new SolrException(ErrorCode.SERVER_ERROR, "Error loading config name for collection " + collection, e);
181     }
182 
183     return configName;
184   }
185 
186 
187   private static class ZKTF implements ThreadFactory {
188     private static ThreadGroup tg = new ThreadGroup("ZkStateReader");
189     @Override
190     public Thread newThread(Runnable r) {
191       Thread td = new Thread(tg, r);
192       td.setDaemon(true);
193       return td;
194     }
195   }
196 
197   private final SolrZkClient zkClient;
198   
199   private final boolean closeClient;
200 
201   private volatile Aliases aliases = new Aliases();
202 
203   private volatile boolean closed = false;
204 
205   public ZkStateReader(SolrZkClient zkClient) {
206     this(zkClient, null);
207   }
208 
209   public ZkStateReader(SolrZkClient zkClient, Runnable securityNodeListener) {
210     this.zkClient = zkClient;
211     this.configManager = new ZkConfigManager(zkClient);
212     this.closeClient = false;
213     this.securityNodeListener = securityNodeListener;
214   }
215 
216 
217   public ZkStateReader(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout) {
218     this.zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout, zkClientConnectTimeout,
219         // on reconnect, reload cloud info
220         new OnReconnect() {
221           @Override
222           public void command() {
223             try {
224               ZkStateReader.this.createClusterStateWatchersAndUpdate();
225             } catch (KeeperException e) {
226               log.error("", e);
227               throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
228                   "", e);
229             } catch (InterruptedException e) {
230               // Restore the interrupted status
231               Thread.currentThread().interrupt();
232               log.error("", e);
233               throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
234                   "", e);
235             }
236           }
237         });
238     this.configManager = new ZkConfigManager(zkClient);
239     this.closeClient = true;
240     this.securityNodeListener = null;
241   }
242 
243   public ZkConfigManager getConfigManager() {
244     return configManager;
245   }
246 
247   /**
248    * Forcibly refresh cluster state from ZK. Do this only to avoid race conditions because it's expensive.
249    */
250   public void updateClusterState() throws KeeperException, InterruptedException {
251     synchronized (getUpdateLock()) {
252       if (clusterState == null) {
253         // Never initialized, just run normal initialization.
254         createClusterStateWatchersAndUpdate();
255         return;
256       }
257       // No need to set watchers because we should already have watchers registered for everything.
258       refreshLegacyClusterState(null);
259       // Need a copy so we don't delete from what we're iterating over.
260       Collection<String> safeCopy = new ArrayList<>(watchedCollectionStates.keySet());
261       for (String coll : safeCopy) {
262         DocCollection newState = fetchCollectionState(coll, null);
263         updateWatchedCollection(coll, newState);
264       }
265       refreshCollectionList(null);
266       refreshLiveNodes(null);
267       constructState();
268     }
269   }
270 
271   /** Refresh the set of live nodes. */
272   public void updateLiveNodes() throws KeeperException, InterruptedException {
273     refreshLiveNodes(null);
274   }
275   
276   public Aliases getAliases() {
277     return aliases;
278   }
279 
280   public Integer compareStateVersions(String coll, int version) {
281     DocCollection collection = clusterState.getCollectionOrNull(coll);
282     if (collection == null) return null;
283     if (collection.getZNodeVersion() < version) {
284       log.debug("server older than client {}<{}", collection.getZNodeVersion(), version);
285       DocCollection nu = getCollectionLive(this, coll);
286       if (nu == null) return -1 ;
287       if (nu.getZNodeVersion() > collection.getZNodeVersion()) {
288         updateWatchedCollection(coll, nu);
289         collection = nu;
290       }
291     }
292     
293     if (collection.getZNodeVersion() == version) {
294       return null;
295     }
296     
297     log.debug("wrong version from client {}!={} ", version, collection.getZNodeVersion());
298     
299     return collection.getZNodeVersion();
300   }
301   
302   public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException,
303       InterruptedException {
304     // We need to fetch the current cluster state and the set of live nodes
305 
306     log.info("Updating cluster state from ZooKeeper... ");
307 
308     // Sanity check ZK structure.
309     if (!zkClient.exists(CLUSTER_STATE, true)) {
310       throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
311               "Cannot connect to cluster at " + zkClient.getZkServerAddress() + ": cluster not found/not ready");
312     }
313 
314     // on reconnect of SolrZkClient force refresh and re-add watches.
315     refreshLegacyClusterState(new LegacyClusterStateWatcher());
316     refreshStateFormat2Collections();
317     refreshCollectionList(new CollectionsChildWatcher());
318     refreshLiveNodes(new LiveNodeWatcher());
319 
320     synchronized (ZkStateReader.this.getUpdateLock()) {
321       constructState();
322 
323       zkClient.exists(ALIASES,
324           new Watcher() {
325             
326             @Override
327             public void process(WatchedEvent event) {
328               // session events are not change events,
329               // and do not remove the watcher
330               if (EventType.None.equals(event.getType())) {
331                 return;
332               }
333               try {
334                 synchronized (ZkStateReader.this.getUpdateLock()) {
335                   log.info("Updating aliases... ");
336 
337                   // remake watch
338                   final Watcher thisWatch = this;
339                   Stat stat = new Stat();
340                   byte[] data = zkClient.getData(ALIASES, thisWatch, stat ,
341                       true);
342 
343                   Aliases aliases = ClusterState.load(data);
344 
345                   ZkStateReader.this.aliases = aliases;
346                 }
347               } catch (KeeperException e) {
348                 if (e.code() == KeeperException.Code.SESSIONEXPIRED
349                     || e.code() == KeeperException.Code.CONNECTIONLOSS) {
350                   log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
351                   return;
352                 }
353                 log.error("", e);
354                 throw new ZooKeeperException(
355                     SolrException.ErrorCode.SERVER_ERROR, "", e);
356               } catch (InterruptedException e) {
357                 // Restore the interrupted status
358                 Thread.currentThread().interrupt();
359                 log.warn("", e);
360                 return;
361               }
362             }
363             
364           }, true);
365     }
366     updateAliases();
367 
368     if (securityNodeListener != null) {
369       addSecuritynodeWatcher(SOLR_SECURITY_CONF_PATH, new Callable<Pair<byte[], Stat>>() {
370         @Override
371         public void call(Pair<byte[], Stat> pair) {
372           ConfigData cd = new ConfigData();
373           cd.data = pair.getKey() == null || pair.getKey().length == 0 ? EMPTY_MAP : Utils.getDeepCopy((Map) fromJSON(pair.getKey()), 4, false);
374           cd.version = pair.getValue() == null ? -1 : pair.getValue().getVersion();
375           securityData = cd;
376           securityNodeListener.run();
377         }
378       });
379       securityData = getSecurityProps(true);
380     }
381   }
382 
383   private void addSecuritynodeWatcher(final String path, final Callable<Pair<byte[], Stat>> callback)
384       throws KeeperException, InterruptedException {
385     zkClient.exists(SOLR_SECURITY_CONF_PATH,
386         new Watcher() {
387 
388           @Override
389           public void process(WatchedEvent event) {
390             // session events are not change events,
391             // and do not remove the watcher
392             if (EventType.None.equals(event.getType())) {
393               return;
394             }
395             try {
396               synchronized (ZkStateReader.this.getUpdateLock()) {
397                 log.info("Updating {} ... ", path);
398 
399                 // remake watch
400                 final Watcher thisWatch = this;
401                 Stat stat = new Stat();
402                 byte[] data = getZkClient().getData(path, thisWatch, stat, true);
403                 try {
404                   callback.call(new Pair<>(data, stat));
405                 } catch (Exception e) {
406                   if (e instanceof KeeperException) throw (KeeperException) e;
407                   if (e instanceof InterruptedException) throw (InterruptedException) e;
408                   log.error("Error running collections node listener", e);
409                 }
410               }
411             } catch (KeeperException e) {
412               if (e.code() == KeeperException.Code.SESSIONEXPIRED
413                   || e.code() == KeeperException.Code.CONNECTIONLOSS) {
414                 log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
415                 return;
416               }
417               log.error("", e);
418               throw new ZooKeeperException(
419                   ErrorCode.SERVER_ERROR, "", e);
420             } catch (InterruptedException e) {
421               // Restore the interrupted status
422               Thread.currentThread().interrupt();
423               log.warn("", e);
424               return;
425             }
426           }
427 
428         }, true);
429   }
430 
431   /**
432    * Construct the total state view from all sources.
433    * Must hold {@link #getUpdateLock()} before calling this.
434    */
435   private void constructState() {
436     // Legacy clusterstate is authoritative, for backwards compatibility.
437     // To move a collection's state to format2, first create the new state2 format node, then remove legacy entry.
438     Map<String, ClusterState.CollectionRef> result = new LinkedHashMap<>(legacyCollectionStates);
439 
440     // Are there any interesting collections that disappeared from the legacy cluster state?
441     for (String coll : interestingCollections) {
442       if (!result.containsKey(coll) && !watchedCollectionStates.containsKey(coll)) {
443         new StateWatcher(coll).refreshAndWatch(true);
444       }
445     }
446   
447     // Add state format2 collections, but don't override legacy collection states.
448     for (Map.Entry<String, DocCollection> entry : watchedCollectionStates.entrySet()) {
449       if (!result.containsKey(entry.getKey())) {
450         result.put(entry.getKey(), new ClusterState.CollectionRef(entry.getValue()));
451       }
452     }
453 
454     // Finally, add any lazy collections that aren't already accounted for.
455     for (Map.Entry<String, LazyCollectionRef> entry : lazyCollectionStates.entrySet()) {
456       if (!result.containsKey(entry.getKey())) {
457         result.put(entry.getKey(), entry.getValue());
458       }
459     }
460 
461     this.clusterState = new ClusterState(liveNodes, result, legacyClusterStateVersion);
462     log.debug("clusterStateSet: version {} legacy {} interesting {} watched {} lazy {} total {}",
463         clusterState.getZkClusterStateVersion(),
464         legacyCollectionStates.keySet(),
465         interestingCollections,
466         watchedCollectionStates.keySet(),
467         lazyCollectionStates.keySet(),
468         clusterState.getCollections());
469   }
470 
471   /**
472    * Refresh legacy (shared) clusterstate.json
473    */
474   private void refreshLegacyClusterState(Watcher watcher)
475       throws KeeperException, InterruptedException {
476     try {
477       Stat stat = new Stat();
478       byte[] data = zkClient.getData(CLUSTER_STATE, watcher, stat, true);
479       ClusterState loadedData = ClusterState.load(stat.getVersion(), data, Collections.<String>emptySet(), CLUSTER_STATE);
480       synchronized (getUpdateLock()) {
481         this.legacyCollectionStates = loadedData.getCollectionStates();
482         this.legacyClusterStateVersion = stat.getVersion();
483       }
484     } catch (KeeperException.NoNodeException e) {
485       // Ignore missing legacy clusterstate.json.
486       synchronized (getUpdateLock()) {
487         this.legacyCollectionStates = emptyMap();
488         this.legacyClusterStateVersion = 0;
489       }
490     }
491   }
492 
493   /**
494    * Refresh state format2 collections.
495    */
496   private void refreshStateFormat2Collections() {
497     // It's okay if no format2 state.json exists, if one did not previous exist.
498     for (String coll : interestingCollections) {
499       new StateWatcher(coll).refreshAndWatch(watchedCollectionStates.containsKey(coll));
500     }
501   }
502 
503   /**
504    * Search for any lazy-loadable state format2 collections.
505    *
506    * A stateFormat=1 collection which is not interesting to us can also
507    * be put into the {@link #lazyCollectionStates} map here. But that is okay
508    * because {@link #constructState()} will give priority to collections in the
509    * shared collection state over this map.
510    * In fact this is a clever way to avoid doing a ZK exists check on
511    * the /collections/collection_name/state.json znode
512    * Such an exists check is done in {@link ClusterState#hasCollection(String)} and
513    * {@link ClusterState#getCollections()} method as a safeguard against exposing wrong collection names to the users
514    */
515   private void refreshCollectionList(Watcher watcher) throws KeeperException, InterruptedException {
516     List<String> children = null;
517     try {
518       children = zkClient.getChildren(COLLECTIONS_ZKNODE, watcher, true);
519     } catch (KeeperException.NoNodeException e) {
520       log.warn("Error fetching collection names");
521       // fall through
522     }
523     if (children == null || children.isEmpty()) {
524       lazyCollectionStates.clear();
525       return;
526     }
527 
528     // Don't mess with watchedCollections, they should self-manage.
529 
530     // First, drop any children that disappeared.
531     this.lazyCollectionStates.keySet().retainAll(children);
532     for (String coll : children) {
533       // We will create an eager collection for any interesting collections, so don't add to lazy.
534       if (!interestingCollections.contains(coll)) {
535         // Double check contains just to avoid allocating an object.
536         LazyCollectionRef existing = lazyCollectionStates.get(coll);
537         if (existing == null) {
538           lazyCollectionStates.putIfAbsent(coll, new LazyCollectionRef(coll));
539         }
540       }
541     }
542   }
543 
544   private class LazyCollectionRef extends ClusterState.CollectionRef {
545 
546     private final String collName;
547 
548     public LazyCollectionRef(String collName) {
549       super(null);
550       this.collName = collName;
551     }
552 
553     @Override
554     public DocCollection get() {
555       // TODO: consider limited caching
556       return getCollectionLive(ZkStateReader.this, collName);
557     }
558 
559     @Override
560     public boolean isLazilyLoaded() {
561       return true;
562     }
563 
564     @Override
565     public String toString() {
566       return "LazyCollectionRef(" + collName + ")";
567     }
568   }
569 
570   /**
571    * Refresh live_nodes.
572    */
573   private void refreshLiveNodes(Watcher watcher) throws KeeperException, InterruptedException {
574     Set<String> newLiveNodes;
575     try {
576       List<String> nodeList = zkClient.getChildren(LIVE_NODES_ZKNODE, watcher, true);
577       log.debug("Updating live nodes from ZooKeeper... ({})", nodeList.size());
578       newLiveNodes = new HashSet<>(nodeList);
579     } catch (KeeperException.NoNodeException e) {
580       newLiveNodes = emptySet();
581     }
582     synchronized (getUpdateLock()) {
583       this.liveNodes = newLiveNodes;
584       if (clusterState != null) {
585         clusterState.setLiveNodes(newLiveNodes);
586       }
587     }
588   }
589 
590   /**
591    * @return information about the cluster from ZooKeeper
592    */
593   public ClusterState getClusterState() {
594     return clusterState;
595   }
596   
597   public Object getUpdateLock() {
598     return this;
599   }
600 
601   public void close() {
602     this.closed  = true;
603     if (closeClient) {
604       zkClient.close();
605     }
606   }
607   
608   public String getLeaderUrl(String collection, String shard, int timeout)
609       throws InterruptedException, KeeperException {
610     ZkCoreNodeProps props = new ZkCoreNodeProps(getLeaderRetry(collection,
611         shard, timeout));
612     return props.getCoreUrl();
613   }
614 
615   public Replica getLeader(String collection, String shard) throws InterruptedException {
616     if (clusterState != null) {
617       Replica replica = clusterState.getLeader(collection, shard);
618       if (replica != null && getClusterState().liveNodesContain(replica.getNodeName())) {
619         return replica;
620       }
621     }
622     return null;
623   }
624 
625   /**
626    * Get shard leader properties, with retry if none exist.
627    */
628   public Replica getLeaderRetry(String collection, String shard) throws InterruptedException {
629     return getLeaderRetry(collection, shard, GET_LEADER_RETRY_DEFAULT_TIMEOUT);
630   }
631 
632   /**
633    * Get shard leader properties, with retry if none exist.
634    */
635   public Replica getLeaderRetry(String collection, String shard, int timeout) throws InterruptedException {
636     long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS);
637     while (true) {
638       Replica leader = getLeader(collection, shard);
639       if (leader != null) return leader;
640       if (System.nanoTime() >= timeoutAt || closed) break;
641       Thread.sleep(GET_LEADER_RETRY_INTERVAL_MS);
642     }
643     throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "No registered leader was found after waiting for "
644         + timeout + "ms " + ", collection: " + collection + " slice: " + shard);
645   }
646 
647   /**
648    * Get path where shard leader properties live in zookeeper.
649    */
650   public static String getShardLeadersPath(String collection, String shardId) {
651     return COLLECTIONS_ZKNODE + "/" + collection + "/"
652         + SHARD_LEADERS_ZKNODE + (shardId != null ? ("/" + shardId)
653         : "") + "/leader";
654   }
655 
656   /**
657    * Get path where shard leader elections ephemeral nodes are.
658    */
659   public static String getShardLeadersElectPath(String collection, String shardId) {
660     return COLLECTIONS_ZKNODE + "/" + collection + "/"
661         + LEADER_ELECT_ZKNODE  + (shardId != null ? ("/" + shardId + "/" + ELECTION_NODE)
662         : "");
663   }
664 
665 
666   public List<ZkCoreNodeProps> getReplicaProps(String collection, String shardId, String thisCoreNodeName) {
667     return getReplicaProps(collection, shardId, thisCoreNodeName, null);
668   }
669   
670   public List<ZkCoreNodeProps> getReplicaProps(String collection, String shardId, String thisCoreNodeName,
671       Replica.State mustMatchStateFilter) {
672     return getReplicaProps(collection, shardId, thisCoreNodeName, mustMatchStateFilter, null);
673   }
674   
675   public List<ZkCoreNodeProps> getReplicaProps(String collection, String shardId, String thisCoreNodeName,
676       Replica.State mustMatchStateFilter, Replica.State mustNotMatchStateFilter) {
677     assert thisCoreNodeName != null;
678     ClusterState clusterState = this.clusterState;
679     if (clusterState == null) {
680       return null;
681     }
682     Map<String,Slice> slices = clusterState.getSlicesMap(collection);
683     if (slices == null) {
684       throw new ZooKeeperException(ErrorCode.BAD_REQUEST,
685           "Could not find collection in zk: " + collection + " "
686               + clusterState.getCollections());
687     }
688     
689     Slice replicas = slices.get(shardId);
690     if (replicas == null) {
691       throw new ZooKeeperException(ErrorCode.BAD_REQUEST, "Could not find shardId in zk: " + shardId);
692     }
693     
694     Map<String,Replica> shardMap = replicas.getReplicasMap();
695     List<ZkCoreNodeProps> nodes = new ArrayList<>(shardMap.size());
696     for (Entry<String,Replica> entry : shardMap.entrySet()) {
697       ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());
698       
699       String coreNodeName = entry.getValue().getName();
700       
701       if (clusterState.liveNodesContain(nodeProps.getNodeName()) && !coreNodeName.equals(thisCoreNodeName)) {
702         if (mustMatchStateFilter == null || mustMatchStateFilter == Replica.State.getState(nodeProps.getState())) {
703           if (mustNotMatchStateFilter == null || mustNotMatchStateFilter != Replica.State.getState(nodeProps.getState())) {
704             nodes.add(nodeProps);
705           }
706         }
707       }
708     }
709     if (nodes.size() == 0) {
710       // no replicas
711       return null;
712     }
713 
714     return nodes;
715   }
716 
717   public SolrZkClient getZkClient() {
718     return zkClient;
719   }
720 
721   public void updateAliases() throws KeeperException, InterruptedException {
722     byte[] data = zkClient.getData(ALIASES, null, null, true);
723 
724     Aliases aliases = ClusterState.load(data);
725 
726     ZkStateReader.this.aliases = aliases;
727   }
728   public Map getClusterProps(){
729     Map result = null;
730     try {
731       if(getZkClient().exists(ZkStateReader.CLUSTER_PROPS, true)){
732         result = (Map) Utils.fromJSON(getZkClient().getData(ZkStateReader.CLUSTER_PROPS, null, new Stat(), true)) ;
733       } else {
734         result= new LinkedHashMap();
735       }
736       return result;
737     } catch (Exception e) {
738       throw new SolrException(ErrorCode.SERVER_ERROR,"Error reading cluster properties",e) ;
739     }
740   }
741 
742   /**
743    * This method sets a cluster property.
744    *
745    * @param propertyName  The property name to be set.
746    * @param propertyValue The value of the property.
747    */
748   public void setClusterProperty(String propertyName, String propertyValue) {
749     if (!KNOWN_CLUSTER_PROPS.contains(propertyName)) {
750       throw new SolrException(ErrorCode.BAD_REQUEST, "Not a known cluster property " + propertyName);
751     }
752 
753     for (; ; ) {
754       Stat s = new Stat();
755       try {
756         if (getZkClient().exists(CLUSTER_PROPS, true)) {
757           int v = 0;
758           Map properties = (Map) Utils.fromJSON(getZkClient().getData(CLUSTER_PROPS, null, s, true));
759           if (propertyValue == null) {
760             //Don't update ZK unless absolutely necessary.
761             if (properties.get(propertyName) != null) {
762               properties.remove(propertyName);
763               getZkClient().setData(CLUSTER_PROPS, Utils.toJSON(properties), s.getVersion(), true);
764             }
765           } else {
766             //Don't update ZK unless absolutely necessary.
767             if (!propertyValue.equals(properties.get(propertyName))) {
768               properties.put(propertyName, propertyValue);
769               getZkClient().setData(CLUSTER_PROPS, Utils.toJSON(properties), s.getVersion(), true);
770             }
771           }
772         } else {
773           Map properties = new LinkedHashMap();
774           properties.put(propertyName, propertyValue);
775           getZkClient().create(CLUSTER_PROPS, Utils.toJSON(properties), CreateMode.PERSISTENT, true);
776         }
777       } catch (KeeperException.BadVersionException bve) {
778         log.warn("Race condition while trying to set a new cluster prop on current version " + s.getVersion());
779         //race condition
780         continue;
781       } catch (KeeperException.NodeExistsException nee) {
782         log.warn("Race condition while trying to set a new cluster prop on current version " + s.getVersion());
783         //race condition
784         continue;
785       } catch (Exception ex) {
786         log.error("Error updating path " + CLUSTER_PROPS, ex);
787         throw new SolrException(ErrorCode.SERVER_ERROR, "Error updating cluster property " + propertyName, ex);
788       }
789       break;
790     }
791   }
792 
793 
794 
795   /**
796    * Returns the content of /security.json from ZooKeeper as a Map
797    * If the files doesn't exist, it returns null.
798    */
799   public ConfigData getSecurityProps(boolean getFresh) {
800     if (!getFresh) {
801       if (securityData == null) return new ConfigData(EMPTY_MAP, -1);
802       return new ConfigData(securityData.data, securityData.version);
803     }
804     try {
805       Stat stat = new Stat();
806       if(getZkClient().exists(SOLR_SECURITY_CONF_PATH, true)) {
807         byte[] data = getZkClient()
808             .getData(ZkStateReader.SOLR_SECURITY_CONF_PATH, null, stat, true);
809         return data != null && data.length > 0 ?
810             new ConfigData((Map<String, Object>) Utils.fromJSON(data), stat.getVersion()) :
811             null;
812       }
813     } catch (KeeperException | InterruptedException e) {
814       throw new SolrException(ErrorCode.SERVER_ERROR,"Error reading security properties",e) ;
815     }
816     return null;
817   }
818   /**
819    * Returns the baseURL corresponding to a given node's nodeName --
820    * NOTE: does not (currently) imply that the nodeName (or resulting 
821    * baseURL) exists in the cluster.
822    * @lucene.experimental
823    */
824   public String getBaseUrlForNodeName(final String nodeName) {
825     final int _offset = nodeName.indexOf("_");
826     if (_offset < 0) {
827       throw new IllegalArgumentException("nodeName does not contain expected '_' seperator: " + nodeName);
828     }
829     final String hostAndPort = nodeName.substring(0,_offset);
830     try {
831       final String path = URLDecoder.decode(nodeName.substring(1+_offset), "UTF-8");
832       String urlScheme = (String) getClusterProps().get(URL_SCHEME);
833       if(urlScheme == null) {
834         urlScheme = "http";
835       }
836       return urlScheme + "://" + hostAndPort + (path.isEmpty() ? "" : ("/" + path));
837     } catch (UnsupportedEncodingException e) {
838       throw new IllegalStateException("JVM Does not seem to support UTF-8", e);
839     }
840   }
841 
842   /** Watches a single collection's format2 state.json. */
843   class StateWatcher implements Watcher {
844     private final String coll;
845 
846     StateWatcher(String coll) {
847       this.coll = coll;
848     }
849 
850     @Override
851     public void process(WatchedEvent event) {
852       if (!interestingCollections.contains(coll)) {
853         // This collection is no longer interesting, stop watching.
854         log.info("Uninteresting collection {}", coll);
855         return;
856       }
857 
858       // session events are not change events,
859       // and do not remove the watcher
860       if (EventType.None.equals(event.getType())) {
861         return;
862       }
863 
864       log.info("A cluster state change: {} for collection {} has occurred - updating... (live nodes size: {})",
865               (event), coll, ZkStateReader.this.clusterState == null ? 0
866                       : ZkStateReader.this.clusterState.getLiveNodes().size());
867 
868       refreshAndWatch(true);
869       synchronized (getUpdateLock()) {
870         constructState();
871       }
872     }
873 
874     /**
875      * Refresh collection state from ZK and leave a watch for future changes.
876      * As a side effect, updates {@link #clusterState} and {@link #watchedCollectionStates}
877      * with the results of the refresh.
878      *
879      * @param expectExists if true, error if no state node exists
880      */
881     public void refreshAndWatch(boolean expectExists) {
882       try {
883         DocCollection newState = fetchCollectionState(coll, this);
884         updateWatchedCollection(coll, newState);
885       } catch (KeeperException.NoNodeException e) {
886         if (expectExists) {
887           log.warn("State node vanished for collection: " + coll, e);
888         }
889       } catch (KeeperException e) {
890         if (e.code() == KeeperException.Code.SESSIONEXPIRED
891                 || e.code() == KeeperException.Code.CONNECTIONLOSS) {
892           log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
893           return;
894         }
895         log.error("Unwatched collection: " + coll, e);
896         throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e);
897 
898       } catch (InterruptedException e) {
899         Thread.currentThread().interrupt();
900         log.error("Unwatched collection :" + coll, e);
901       }
902     }
903   }
904 
905   /** Watches the legacy clusterstate.json. */
906   class LegacyClusterStateWatcher implements Watcher {
907 
908     @Override
909     public void process(WatchedEvent event) {
910       // session events are not change events,
911       // and do not remove the watcher
912       if (EventType.None.equals(event.getType())) {
913         return;
914       }
915       log.info("A cluster state change: {}, has occurred - updating... (live nodes size: {})", (event), ZkStateReader.this.clusterState == null ? 0 : ZkStateReader.this.clusterState.getLiveNodes().size());
916       refreshAndWatch();
917       synchronized (getUpdateLock()) {
918         constructState();
919       }
920     }
921 
922     /** Must hold {@link #getUpdateLock()} before calling this method. */
923     public void refreshAndWatch() {
924       try {
925         refreshLegacyClusterState(this);
926       } catch (KeeperException.NoNodeException e) {
927         throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
928                 "Cannot connect to cluster at " + zkClient.getZkServerAddress() + ": cluster not found/not ready");
929       } catch (KeeperException e) {
930         if (e.code() == KeeperException.Code.SESSIONEXPIRED
931                 || e.code() == KeeperException.Code.CONNECTIONLOSS) {
932           log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
933           return;
934         }
935         log.error("", e);
936         throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
937                 "", e);
938       } catch (InterruptedException e) {
939         // Restore the interrupted status
940         Thread.currentThread().interrupt();
941         log.warn("", e);
942       }
943     }
944   }
945 
946   /** Watches /collections children . */
947   class CollectionsChildWatcher implements Watcher {
948 
949     @Override
950     public void process(WatchedEvent event) {
951       // session events are not change events,
952       // and do not remove the watcher
953       if (EventType.None.equals(event.getType())) {
954         return;
955       }
956       log.info("A collections change: {}, has occurred - updating...", (event));
957       refreshAndWatch();
958       synchronized (getUpdateLock()) {
959         constructState();
960       }
961     }
962 
963     /** Must hold {@link #getUpdateLock()} before calling this method. */
964     public void refreshAndWatch() {
965       try {
966         refreshCollectionList(this);
967       } catch (KeeperException e) {
968         if (e.code() == KeeperException.Code.SESSIONEXPIRED
969             || e.code() == KeeperException.Code.CONNECTIONLOSS) {
970           log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
971           return;
972         }
973         log.error("", e);
974         throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
975             "", e);
976       } catch (InterruptedException e) {
977         // Restore the interrupted status
978         Thread.currentThread().interrupt();
979         log.warn("", e);
980       }
981     }
982   }
983 
984   /** Watches the live_nodes and syncs changes. */
985   class LiveNodeWatcher implements Watcher {
986 
987     @Override
988     public void process(WatchedEvent event) {
989       // session events are not change events,
990       // and do not remove the watcher
991       if (EventType.None.equals(event.getType())) {
992         return;
993       }
994       log.info("A live node change: {}, has occurred - updating... (live nodes size: {})", (event), liveNodes.size());
995       refreshAndWatch();
996     }
997 
998     public void refreshAndWatch() {
999       try {
1000         refreshLiveNodes(this);
1001       } catch (KeeperException e) {
1002         if (e.code() == KeeperException.Code.SESSIONEXPIRED
1003             || e.code() == KeeperException.Code.CONNECTIONLOSS) {
1004           log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
1005           return;
1006         }
1007         log.error("", e);
1008         throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
1009             "", e);
1010       } catch (InterruptedException e) {
1011         // Restore the interrupted status
1012         Thread.currentThread().interrupt();
1013         log.warn("", e);
1014       }
1015     }
1016   }
1017 
1018   public static DocCollection getCollectionLive(ZkStateReader zkStateReader,
1019       String coll) {
1020     try {
1021       return zkStateReader.fetchCollectionState(coll, null);
1022     } catch (KeeperException e) {
1023       throw new SolrException(ErrorCode.BAD_REQUEST,
1024           "Could not load collection from ZK:" + coll, e);
1025     } catch (InterruptedException e) {
1026       Thread.currentThread().interrupt();
1027       throw new SolrException(ErrorCode.BAD_REQUEST,
1028           "Could not load collection from ZK:" + coll, e);
1029     }
1030   }
1031 
1032   private DocCollection fetchCollectionState(String coll, Watcher watcher) throws KeeperException, InterruptedException {
1033     String collectionPath = getCollectionPath(coll);
1034     try {
1035       Stat stat = new Stat();
1036       byte[] data = zkClient.getData(collectionPath, watcher, stat, true);
1037       ClusterState state = ClusterState.load(stat.getVersion(), data,
1038               Collections.<String>emptySet(), collectionPath);
1039       ClusterState.CollectionRef collectionRef = state.getCollectionStates().get(coll);
1040       return collectionRef == null ? null : collectionRef.get();
1041     } catch (KeeperException.NoNodeException e) {
1042       return null;
1043     }
1044   }
1045 
1046   public static String getCollectionPath(String coll) {
1047     return COLLECTIONS_ZKNODE+"/"+coll + "/state.json";
1048   }
1049 
1050   public void addCollectionWatch(String coll) throws KeeperException, InterruptedException {
1051     if (interestingCollections.add(coll)) {
1052       log.info("addZkWatch {}", coll);
1053       new StateWatcher(coll).refreshAndWatch(false);
1054       synchronized (getUpdateLock()) {
1055         constructState();
1056       }
1057     }
1058   }
1059 
1060   private void updateWatchedCollection(String coll, DocCollection newState) {
1061     if (newState == null) {
1062       log.info("Deleting data for {}", coll);
1063       watchedCollectionStates.remove(coll);
1064       return;
1065     }
1066 
1067     // CAS update loop
1068     while (true) {
1069       if (!interestingCollections.contains(coll)) {
1070         break;
1071       }
1072       DocCollection oldState = watchedCollectionStates.get(coll);
1073       if (oldState == null) {
1074         if (watchedCollectionStates.putIfAbsent(coll, newState) == null) {
1075           log.info("Add data for {} ver {} ", coll, newState.getZNodeVersion());
1076           break;
1077         }
1078       } else {
1079         if (oldState.getZNodeVersion() >= newState.getZNodeVersion()) {
1080           // Nothing to do, someone else updated same or newer.
1081           break;
1082         }
1083         if (watchedCollectionStates.replace(coll, oldState, newState)) {
1084           log.info("Updating data for {} from {} to {} ", coll, oldState.getZNodeVersion(), newState.getZNodeVersion());
1085           break;
1086         }
1087       }
1088     }
1089 
1090     // Resolve race with removeZKWatch.
1091     if (!interestingCollections.contains(coll)) {
1092       watchedCollectionStates.remove(coll);
1093       log.info("Removing uninteresting collection {}", coll);
1094     }
1095   }
1096   
1097   /** This is not a public API. Only used by ZkController */
1098   public void removeZKWatch(String coll) {
1099     log.info("Removing watch for uninteresting collection {}", coll);
1100     interestingCollections.remove(coll);
1101     watchedCollectionStates.remove(coll);
1102     lazyCollectionStates.put(coll, new LazyCollectionRef(coll));
1103     synchronized (getUpdateLock()) {
1104       constructState();
1105     }
1106   }
1107 
1108   public static class ConfigData {
1109     public Map<String, Object> data;
1110     public int version;
1111 
1112     public ConfigData() {
1113     }
1114 
1115     public ConfigData(Map<String, Object> data, int version) {
1116       this.data = data;
1117       this.version = version;
1118 
1119     }
1120   }
1121 }